diff --git a/cypress/integration/api-tokens.spec.js b/cypress/integration/api-tokens.spec.js index 90ac8e1c..84acf2cd 100644 --- a/cypress/integration/api-tokens.spec.js +++ b/cypress/integration/api-tokens.spec.js @@ -1,211 +1,211 @@ /** * Copyright (C) 2020 The Software Heritage developers * See the AUTHORS file at the top-level directory of this distribution * License: GNU Affero General Public License version 3, or any later version * See top-level LICENSE file for more information */ describe('Test API tokens UI', function() { it('should ask for user to login', function() { - cy.visit(this.Urls.api_tokens(), {failOnStatusCode: false}); + cy.visit(`${this.Urls.oidc_profile()}#tokens`, {failOnStatusCode: false}); cy.location().should(loc => { expect(loc.pathname).to.eq(this.Urls.oidc_login()); }); }); function initTokensPage(Urls, tokens) { cy.server(); cy.route({ method: 'GET', url: `${Urls.oidc_list_bearer_tokens()}/**`, response: { 'recordsTotal': tokens.length, 'draw': 2, 'recordsFiltered': tokens.length, 'data': tokens } }); // the tested UI should not be accessible for standard Django users // but we need a user logged in for testing it cy.adminLogin(); - cy.visit(Urls.api_tokens()); + cy.visit(`${Urls.oidc_profile()}#tokens`); } function generateToken(Urls, status, tokenValue = '') { cy.route({ method: 'POST', url: `${Urls.oidc_generate_bearer_token()}/**`, response: tokenValue, status: status }).as('generateTokenRequest'); cy.contains('Generate new token') .click(); cy.get('.modal-dialog') .should('be.visible'); cy.get('.modal-header') .should('contain', 'Bearer token generation'); cy.get('#swh-user-password-submit') .should('be.disabled'); cy.get('#swh-user-password') .type('secret'); cy.get('#swh-user-password-submit') .should('be.enabled'); cy.get('#swh-user-password-submit') .click(); cy.wait('@generateTokenRequest'); if (status === 200) { cy.get('#swh-user-password-submit') .should('be.disabled'); } } it('should generate and display bearer token', function() { initTokensPage(this.Urls, []); const tokenValue = 'bearer-token-value'; generateToken(this.Urls, 200, tokenValue); cy.get('#swh-token-success-message') .should('contain', 'Below is your token'); cy.get('#swh-bearer-token') .should('contain', tokenValue); }); it('should report errors when token generation failed', function() { initTokensPage(this.Urls, []); generateToken(this.Urls, 400); cy.get('#swh-token-error-message') .should('contain', 'You are not allowed to generate bearer tokens'); cy.get('#swh-web-modal-html .close').click(); generateToken(this.Urls, 401); cy.get('#swh-token-error-message') .should('contain', 'The password is invalid'); cy.get('#swh-web-modal-html .close').click(); generateToken(this.Urls, 500); cy.get('#swh-token-error-message') .should('contain', 'Internal server error'); }); function displayToken(Urls, status, tokenValue = '') { cy.route({ method: 'POST', url: `${Urls.oidc_get_bearer_token()}/**`, response: tokenValue, status: status }).as('getTokenRequest'); cy.contains('Display token') - .click(); + .click({force: true}); cy.get('.modal-dialog') .should('be.visible'); cy.get('.modal-header') .should('contain', 'Display bearer token'); cy.get('#swh-user-password-submit') .should('be.disabled'); cy.get('#swh-user-password') - .type('secret'); + .type('secret', {force: true}); cy.get('#swh-user-password-submit') .should('be.enabled'); cy.get('#swh-user-password-submit') - .click(); + .click({force: true}); cy.wait('@getTokenRequest'); if (status === 200) { cy.get('#swh-user-password-submit') .should('be.disabled'); } } it('should show a token when requested', function() { initTokensPage(this.Urls, [{id: 1, creation_date: new Date().toISOString()}]); const tokenValue = 'token-value'; displayToken(this.Urls, 200, tokenValue); cy.get('#swh-token-success-message') .should('contain', 'Below is your token'); cy.get('#swh-bearer-token') .should('contain', tokenValue); }); it('should report errors when token display failed', function() { initTokensPage(this.Urls, [{id: 1, creation_date: new Date().toISOString()}]); displayToken(this.Urls, 401); cy.get('#swh-token-error-message') .should('contain', 'The password is invalid'); cy.get('#swh-web-modal-html .close').click(); displayToken(this.Urls, 500); cy.get('#swh-token-error-message') .should('contain', 'Internal server error'); }); function revokeToken(Urls, status) { cy.route({ method: 'POST', url: `${Urls.oidc_revoke_bearer_tokens()}/**`, response: '', status: status }).as('revokeTokenRequest'); cy.contains('Revoke token') - .click(); + .click({force: true}); cy.get('.modal-dialog') .should('be.visible'); cy.get('.modal-header') .should('contain', 'Revoke bearer token'); cy.get('#swh-user-password-submit') .should('be.disabled'); cy.get('#swh-user-password') - .type('secret'); + .type('secret', {force: true}); cy.get('#swh-user-password-submit') .should('be.enabled'); cy.get('#swh-user-password-submit') - .click(); + .click({force: true}); cy.wait('@revokeTokenRequest'); if (status === 200) { cy.get('#swh-user-password-submit') .should('be.disabled'); } } it('should revoke a token when requested', function() { initTokensPage(this.Urls, [{id: 1, creation_date: new Date().toISOString()}]); revokeToken(this.Urls, 200); cy.get('#swh-token-success-message') .should('contain', 'Bearer token successfully revoked'); }); it('should report errors when token revoke failed', function() { initTokensPage(this.Urls, [{id: 1, creation_date: new Date().toISOString()}]); revokeToken(this.Urls, 401); cy.get('#swh-token-error-message') .should('contain', 'The password is invalid'); cy.get('#swh-web-modal-html .close').click(); revokeToken(this.Urls, 500); cy.get('#swh-token-error-message') .should('contain', 'Internal server error'); }); }); diff --git a/swh/web/api/urls.py b/swh/web/api/urls.py index dea648c9..9033c291 100644 --- a/swh/web/api/urls.py +++ b/swh/web/api/urls.py @@ -1,31 +1,20 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information -from django.conf.urls import url -from django.contrib.auth.decorators import login_required -from django.shortcuts import render - from swh.web.api.apiurls import APIUrls import swh.web.api.views.content # noqa import swh.web.api.views.directory # noqa import swh.web.api.views.graph # noqa import swh.web.api.views.identifiers # noqa import swh.web.api.views.origin # noqa import swh.web.api.views.origin_save # noqa import swh.web.api.views.ping # noqa import swh.web.api.views.release # noqa import swh.web.api.views.revision # noqa import swh.web.api.views.snapshot # noqa import swh.web.api.views.stat # noqa import swh.web.api.views.vault # noqa - -@login_required(login_url="/oidc/login/", redirect_field_name="next_path") -def _tokens_view(request): - return render(request, "api/tokens.html") - - urlpatterns = APIUrls.get_url_patterns() -urlpatterns.append(url(r"^tokens/$", _tokens_view, name="api-tokens")) diff --git a/swh/web/assets/src/bundles/auth/index.js b/swh/web/assets/src/bundles/auth/index.js index 1b67e5e1..afcf5fe6 100644 --- a/swh/web/assets/src/bundles/auth/index.js +++ b/swh/web/assets/src/bundles/auth/index.js @@ -1,216 +1,226 @@ /** * Copyright (C) 2020 The Software Heritage developers * See the AUTHORS file at the top-level directory of this distribution * License: GNU Affero General Public License version 3, or any later version * See top-level LICENSE file for more information */ -import {handleFetchError, csrfPost} from 'utils/functions'; +import {handleFetchError, csrfPost, removeUrlFragment} from 'utils/functions'; import './auth.css'; let apiTokensTable; function updateSubmitButtonState() { const val = $('#swh-user-password').val(); $('#swh-user-password-submit').prop('disabled', val.length === 0); } function passwordForm(infoText, buttonText) { const form = `

${infoText}

`; return form; } function errorMessage(message) { return `

${message}

`; } function successMessage(message) { return `

${message}

`; } function disableSubmitButton() { $('#swh-user-password-submit').prop('disabled', true); $('#swh-user-password').off('change'); $('#swh-user-password').off('keyup'); } function generateToken() { csrfPost(Urls.oidc_generate_bearer_token(), {}, JSON.stringify({password: $('#swh-user-password').val()})) .then(handleFetchError) .then(response => response.text()) .then(token => { disableSubmitButton(); const tokenHtml = `${successMessage('Below is your token.')}
${token}
`; $(`#swh-password-form`).append(tokenHtml); apiTokensTable.draw(); }) .catch(response => { if (response.status === 400) { $(`#swh-password-form`).append( errorMessage('You are not allowed to generate bearer tokens.')); } else if (response.status === 401) { $(`#swh-password-form`).append(errorMessage('The password is invalid.')); } else { $(`#swh-password-form`).append(errorMessage('Internal server error.')); } }); } function displayToken(tokenId) { const postData = { password: $('#swh-user-password').val(), token_id: tokenId }; csrfPost(Urls.oidc_get_bearer_token(), {}, JSON.stringify(postData)) .then(handleFetchError) .then(response => response.text()) .then(token => { disableSubmitButton(); const tokenHtml = `${successMessage('Below is your token.')}
${token}
`; $(`#swh-password-form`).append(tokenHtml); }) .catch(response => { if (response.status === 401) { $(`#swh-password-form`).append(errorMessage('The password is invalid.')); } else { $(`#swh-password-form`).append(errorMessage('Internal server error.')); } }); } function revokeTokens(tokenIds) { const postData = { password: $('#swh-user-password').val(), token_ids: tokenIds }; csrfPost(Urls.oidc_revoke_bearer_tokens(), {}, JSON.stringify(postData)) .then(handleFetchError) .then(() => { disableSubmitButton(); $(`#swh-password-form`).append( successMessage(`Bearer token${tokenIds.length > 1 ? 's' : ''} successfully revoked`)); apiTokensTable.draw(); }) .catch(response => { if (response.status === 401) { $(`#swh-password-form`).append(errorMessage('The password is invalid.')); } else { $(`#swh-password-form`).append(errorMessage('Internal server error.')); } }); } function revokeToken(tokenId) { revokeTokens([tokenId]); } function revokeAllTokens() { const tokenIds = []; const rowsData = apiTokensTable.rows().data(); for (let i = 0; i < rowsData.length; ++i) { tokenIds.push(rowsData[i].id); } revokeTokens(tokenIds); } export function applyTokenAction(action, tokenId) { const actionData = { generate: { modalTitle: 'Bearer token generation', infoText: 'Enter your password and click on the button to generate the token.', buttonText: 'Generate token', submitCallback: generateToken }, display: { modalTitle: 'Display bearer token', infoText: 'Enter your password and click on the button to display the token.', buttonText: 'Display token', submitCallback: displayToken }, revoke: { modalTitle: 'Revoke bearer token', infoText: 'Enter your password and click on the button to revoke the token.', buttonText: 'Revoke token', submitCallback: revokeToken }, revokeAll: { modalTitle: 'Revoke all bearer tokens', infoText: 'Enter your password and click on the button to revoke all tokens.', buttonText: 'Revoke tokens', submitCallback: revokeAllTokens } }; if (!actionData[action]) { return; } const passwordFormHtml = passwordForm( actionData[action].infoText, actionData[action].buttonText); swh.webapp.showModalHtml(actionData[action].modalTitle, passwordFormHtml); $('#swh-user-password').change(updateSubmitButtonState); $('#swh-user-password').keyup(updateSubmitButtonState); $(`#swh-password-form`).submit(event => { event.preventDefault(); event.stopPropagation(); actionData[action].submitCallback(tokenId); }); } -export function initApiTokensPage() { +export function initProfilePage() { $(document).ready(() => { apiTokensTable = $('#swh-bearer-tokens-table') .on('error.dt', (e, settings, techNote, message) => { $('#swh-origin-save-request-list-error').text( 'An error occurred while retrieving the tokens list'); console.log(message); }) .DataTable({ serverSide: true, ajax: Urls.oidc_list_bearer_tokens(), columns: [ { data: 'creation_date', name: 'creation_date', render: (data, type, row) => { if (type === 'display') { let date = new Date(data); return date.toLocaleString(); } return data; } }, { render: (data, type, row) => { const html = ` `; return html; } } ], ordering: false, searching: false, scrollY: '50vh', scrollCollapse: true }); + $('#swh-oidc-profile-tokens-tab').on('shown.bs.tab', () => { + apiTokensTable.draw(); + window.location.hash = '#tokens'; + }); + $('#swh-oidc-profile-account-tab').on('shown.bs.tab', () => { + removeUrlFragment(); + }); + if (window.location.hash === '#tokens') { + $('.nav-tabs a[href="#swh-oidc-profile-tokens"]').tab('show'); + } }); } diff --git a/swh/web/auth/views.py b/swh/web/auth/views.py index 9e3d8615..8f019a99 100644 --- a/swh/web/auth/views.py +++ b/swh/web/auth/views.py @@ -1,243 +1,251 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import json from typing import Any, Dict, cast import uuid from cryptography.fernet import InvalidToken from keycloak.exceptions import KeycloakError import sentry_sdk from django.conf.urls import url from django.contrib.auth import authenticate, login, logout +from django.contrib.auth.decorators import login_required from django.core.cache import cache from django.core.paginator import Paginator from django.http import HttpRequest from django.http.response import ( HttpResponse, HttpResponseForbidden, HttpResponseRedirect, HttpResponseServerError, JsonResponse, ) +from django.shortcuts import render from django.views.decorators.http import require_http_methods from swh.web.auth.models import OIDCUser, OIDCUserOfflineTokens from swh.web.auth.utils import ( decrypt_data, encrypt_data, gen_oidc_pkce_codes, get_oidc_client, ) from swh.web.common.exc import BadInputExc from swh.web.common.utils import reverse def oidc_login(request: HttpRequest) -> HttpResponse: """ Django view to initiate login process using OpenID Connect. """ # generate a CSRF token state = str(uuid.uuid4()) redirect_uri = reverse("oidc-login-complete", request=request) code_verifier, code_challenge = gen_oidc_pkce_codes() request.session["login_data"] = { "code_verifier": code_verifier, "state": state, "redirect_uri": redirect_uri, "next_path": request.GET.get("next_path", ""), "prompt": request.GET.get("prompt", ""), } authorization_url_params = { "state": state, "code_challenge": code_challenge, "code_challenge_method": "S256", "scope": "openid", "prompt": request.GET.get("prompt", ""), } oidc_client = get_oidc_client() authorization_url = oidc_client.authorization_url( redirect_uri, **authorization_url_params ) return HttpResponseRedirect(authorization_url) def oidc_login_complete(request: HttpRequest) -> HttpResponse: """ Django view to finalize login process using OpenID Connect. """ if "login_data" not in request.session: raise Exception("Login process has not been initialized.") login_data = request.session["login_data"] next_path = login_data["next_path"] or request.build_absolute_uri("/") if "error" in request.GET: if login_data["prompt"] == "none": # Silent login failed because OIDC session expired. # Redirect to logout page and inform user. logout(request) logout_url = reverse( "logout", query_params={"next_path": next_path, "remote_user": 1} ) return HttpResponseRedirect(logout_url) return HttpResponseServerError(request.GET["error"]) if "code" not in request.GET or "state" not in request.GET: raise BadInputExc("Missing query parameters for authentication.") # get CSRF token returned by OIDC server state = request.GET["state"] if state != login_data["state"]: raise BadInputExc("Wrong CSRF token, aborting login process.") user = authenticate( request=request, code=request.GET["code"], code_verifier=login_data["code_verifier"], redirect_uri=login_data["redirect_uri"], ) if user is None: raise Exception("User authentication failed.") login(request, user) return HttpResponseRedirect(next_path) def oidc_logout(request: HttpRequest) -> HttpResponse: """ Django view to logout using OpenID Connect. """ user = request.user logout(request) if hasattr(user, "refresh_token"): oidc_client = get_oidc_client() user = cast(OIDCUser, user) refresh_token = cast(str, user.refresh_token) # end OpenID Connect session oidc_client.logout(refresh_token) # remove user data from cache cache.delete(f"oidc_user_{user.id}") logout_url = reverse("logout", query_params={"remote_user": 1}) return HttpResponseRedirect(request.build_absolute_uri(logout_url)) @require_http_methods(["POST"]) def oidc_generate_bearer_token(request: HttpRequest) -> HttpResponse: if not request.user.is_authenticated or not isinstance(request.user, OIDCUser): return HttpResponseForbidden() try: data = json.loads(request.body.decode("utf-8")) user = cast(OIDCUser, request.user) oidc_client = get_oidc_client() token = oidc_client.offline_token(user.username, data["password"]) password = data["password"].encode() salt = user.sub.encode() encrypted_token = encrypt_data(token.encode(), password, salt) OIDCUserOfflineTokens.objects.create( user_id=str(user.id), offline_token=encrypted_token ).save() return HttpResponse(token, content_type="text/plain") except KeycloakError as e: sentry_sdk.capture_exception(e) return HttpResponse(status=e.response_code or 500) def oidc_list_bearer_tokens(request: HttpRequest) -> HttpResponse: if not request.user.is_authenticated or not isinstance(request.user, OIDCUser): return HttpResponseForbidden() tokens = OIDCUserOfflineTokens.objects.filter(user_id=str(request.user.id)) tokens = tokens.order_by("-creation_date") length = int(request.GET["length"]) page = int(request.GET["start"]) / length + 1 paginator = Paginator(tokens, length) tokens_data = [ {"id": t.id, "creation_date": t.creation_date.isoformat()} for t in paginator.page(int(page)).object_list ] table_data: Dict[str, Any] = {} table_data["recordsTotal"] = len(tokens_data) table_data["draw"] = int(request.GET["draw"]) table_data["data"] = tokens_data table_data["recordsFiltered"] = len(tokens_data) return JsonResponse(table_data) @require_http_methods(["POST"]) def oidc_get_bearer_token(request: HttpRequest) -> HttpResponse: if not request.user.is_authenticated or not isinstance(request.user, OIDCUser): return HttpResponseForbidden() try: data = json.loads(request.body.decode("utf-8")) user = cast(OIDCUser, request.user) token_data = OIDCUserOfflineTokens.objects.get(id=data["token_id"]) password = data["password"].encode() salt = user.sub.encode() decrypted_token = decrypt_data(token_data.offline_token, password, salt) return HttpResponse(decrypted_token.decode("ascii"), content_type="text/plain") except InvalidToken: return HttpResponse(status=401) @require_http_methods(["POST"]) def oidc_revoke_bearer_tokens(request: HttpRequest) -> HttpResponse: if not request.user.is_authenticated or not isinstance(request.user, OIDCUser): return HttpResponseForbidden() try: data = json.loads(request.body.decode("utf-8")) user = cast(OIDCUser, request.user) for token_id in data["token_ids"]: token_data = OIDCUserOfflineTokens.objects.get(id=token_id) password = data["password"].encode() salt = user.sub.encode() decrypted_token = decrypt_data(token_data.offline_token, password, salt) oidc_client = get_oidc_client() oidc_client.logout(decrypted_token.decode("ascii")) token_data.delete() return HttpResponse(status=200) except InvalidToken: return HttpResponse(status=401) +@login_required(login_url="/oidc/login/", redirect_field_name="next_path") +def _oidc_profile_view(request: HttpRequest) -> HttpResponse: + return render(request, "auth/profile.html") + + urlpatterns = [ url(r"^oidc/login/$", oidc_login, name="oidc-login"), url(r"^oidc/login-complete/$", oidc_login_complete, name="oidc-login-complete"), url(r"^oidc/logout/$", oidc_logout, name="oidc-logout"), url( r"^oidc/generate-bearer-token/$", oidc_generate_bearer_token, name="oidc-generate-bearer-token", ), url( r"^oidc/list-bearer-token/$", oidc_list_bearer_tokens, name="oidc-list-bearer-tokens", ), url( r"^oidc/get-bearer-token/$", oidc_get_bearer_token, name="oidc-get-bearer-token", ), url( r"^oidc/revoke-bearer-tokens/$", oidc_revoke_bearer_tokens, name="oidc-revoke-bearer-tokens", ), + url(r"^oidc/profile/$", _oidc_profile_view, name="oidc-profile",), ] diff --git a/swh/web/common/utils.py b/swh/web/common/utils.py index 09d5109c..88bf56ce 100644 --- a/swh/web/common/utils.py +++ b/swh/web/common/utils.py @@ -1,350 +1,351 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timezone import re from typing import Any, Dict, Optional from bs4 import BeautifulSoup from docutils.core import publish_parts import docutils.parsers.rst import docutils.utils from docutils.writers.html5_polyglot import HTMLTranslator, Writer from iso8601 import ParseError, parse_date from prometheus_client.registry import CollectorRegistry from django.http import HttpRequest, QueryDict from django.urls import reverse as django_reverse from rest_framework.authentication import SessionAuthentication from swh.web.common.exc import BadInputExc from swh.web.common.typing import QueryParameters from swh.web.config import get_config SWH_WEB_METRICS_REGISTRY = CollectorRegistry(auto_describe=True) swh_object_icons = { "branch": "mdi mdi-source-branch", "branches": "mdi mdi-source-branch", "content": "mdi mdi-file-document", "directory": "mdi mdi-folder", "origin": "mdi mdi-source-repository", "person": "mdi mdi-account", "revisions history": "mdi mdi-history", "release": "mdi mdi-tag", "releases": "mdi mdi-tag", "revision": "mdi mdi-rotate-90 mdi-source-commit", "snapshot": "mdi mdi-camera", "visits": "mdi mdi-calendar-month", } def reverse( viewname: str, url_args: Optional[Dict[str, Any]] = None, query_params: Optional[QueryParameters] = None, current_app: Optional[str] = None, urlconf: Optional[str] = None, request: Optional[HttpRequest] = None, ) -> str: """An override of django reverse function supporting query parameters. Args: viewname: the name of the django view from which to compute a url url_args: dictionary of url arguments indexed by their names query_params: dictionary of query parameters to append to the reversed url current_app: the name of the django app tighten to the view urlconf: url configuration module request: build an absolute URI if provided Returns: str: the url of the requested view with processed arguments and query parameters """ if url_args: url_args = {k: v for k, v in url_args.items() if v is not None} url = django_reverse( viewname, urlconf=urlconf, kwargs=url_args, current_app=current_app ) if query_params: query_params = {k: v for k, v in query_params.items() if v is not None} if query_params and len(query_params) > 0: query_dict = QueryDict("", mutable=True) for k in sorted(query_params.keys()): query_dict[k] = query_params[k] url += "?" + query_dict.urlencode(safe="/;:") if request is not None: url = request.build_absolute_uri(url) return url def datetime_to_utc(date): """Returns datetime in UTC without timezone info Args: date (datetime.datetime): input datetime with timezone info Returns: datetime.datetime: datetime in UTC without timezone info """ if date.tzinfo and date.tzinfo != timezone.utc: return date.astimezone(tz=timezone.utc) else: return date def parse_iso8601_date_to_utc(iso_date: str) -> datetime: """Given an ISO 8601 datetime string, parse the result as UTC datetime. Returns: a timezone-aware datetime representing the parsed date Raises: swh.web.common.exc.BadInputExc: provided date does not respect ISO 8601 format Samples: - 2016-01-12 - 2016-01-12T09:19:12+0100 - 2007-01-14T20:34:22Z """ try: date = parse_date(iso_date) return datetime_to_utc(date) except ParseError as e: raise BadInputExc(e) def shorten_path(path): """Shorten the given path: for each hash present, only return the first 8 characters followed by an ellipsis""" sha256_re = r"([0-9a-f]{8})[0-9a-z]{56}" sha1_re = r"([0-9a-f]{8})[0-9a-f]{32}" ret = re.sub(sha256_re, r"\1...", path) return re.sub(sha1_re, r"\1...", ret) def format_utc_iso_date(iso_date, fmt="%d %B %Y, %H:%M UTC"): """Turns a string representation of an ISO 8601 datetime string to UTC and format it into a more human readable one. For instance, from the following input string: '2017-05-04T13:27:13+02:00' the following one is returned: '04 May 2017, 11:27 UTC'. Custom format string may also be provided as parameter Args: iso_date (str): a string representation of an ISO 8601 date fmt (str): optional date formatting string Returns: str: a formatted string representation of the input iso date """ if not iso_date: return iso_date date = parse_iso8601_date_to_utc(iso_date) return date.strftime(fmt) def gen_path_info(path): """Function to generate path data navigation for use with a breadcrumb in the swh web ui. For instance, from a path /folder1/folder2/folder3, it returns the following list:: [{'name': 'folder1', 'path': 'folder1'}, {'name': 'folder2', 'path': 'folder1/folder2'}, {'name': 'folder3', 'path': 'folder1/folder2/folder3'}] Args: path: a filesystem path Returns: list: a list of path data for navigation as illustrated above. """ path_info = [] if path: sub_paths = path.strip("/").split("/") path_from_root = "" for p in sub_paths: path_from_root += "/" + p path_info.append({"name": p, "path": path_from_root.strip("/")}) return path_info def parse_rst(text, report_level=2): """ Parse a reStructuredText string with docutils. Args: text (str): string with reStructuredText markups in it report_level (int): level of docutils report messages to print (1 info 2 warning 3 error 4 severe 5 none) Returns: docutils.nodes.document: a parsed docutils document """ parser = docutils.parsers.rst.Parser() components = (docutils.parsers.rst.Parser,) settings = docutils.frontend.OptionParser( components=components ).get_default_values() settings.report_level = report_level document = docutils.utils.new_document("rst-doc", settings=settings) parser.parse(text, document) return document def get_client_ip(request): """ Return the client IP address from an incoming HTTP request. Args: request (django.http.HttpRequest): the incoming HTTP request Returns: str: The client IP address """ x_forwarded_for = request.META.get("HTTP_X_FORWARDED_FOR") if x_forwarded_for: ip = x_forwarded_for.split(",")[0] else: ip = request.META.get("REMOTE_ADDR") return ip browsers_supported_image_mimes = set( [ "image/gif", "image/png", "image/jpeg", "image/bmp", "image/webp", "image/svg", "image/svg+xml", ] ) def context_processor(request): """ Django context processor used to inject variables in all swh-web templates. """ config = get_config() if ( hasattr(request, "user") and request.user.is_authenticated and not hasattr(request.user, "backend") ): # To avoid django.template.base.VariableDoesNotExist errors # when rendering templates when standard Django user is logged in. request.user.backend = "django.contrib.auth.backends.ModelBackend" return { "swh_object_icons": swh_object_icons, "available_languages": None, "swh_client_config": config["client_config"], "oidc_enabled": bool(config["keycloak"]["server_url"]), "browsers_supported_image_mimes": browsers_supported_image_mimes, + "keycloak": config["keycloak"], } class EnforceCSRFAuthentication(SessionAuthentication): """ Helper class to enforce CSRF validation on a DRF view when a user is not authenticated. """ def authenticate(self, request): user = getattr(request._request, "user", None) self.enforce_csrf(request) return (user, None) def resolve_branch_alias( snapshot: Dict[str, Any], branch: Optional[Dict[str, Any]] ) -> Optional[Dict[str, Any]]: """ Resolve branch alias in snapshot content. Args: snapshot: a full snapshot content branch: a branch alias contained in the snapshot Returns: The real snapshot branch that got aliased. """ while branch and branch["target_type"] == "alias": if branch["target"] in snapshot["branches"]: branch = snapshot["branches"][branch["target"]] else: from swh.web.common import archive snp = archive.lookup_snapshot( snapshot["id"], branches_from=branch["target"], branches_count=1 ) if snp and branch["target"] in snp["branches"]: branch = snp["branches"][branch["target"]] else: branch = None return branch class _NoHeaderHTMLTranslator(HTMLTranslator): """ Docutils translator subclass to customize the generation of HTML from reST-formatted docstrings """ def __init__(self, document): super().__init__(document) self.body_prefix = [] self.body_suffix = [] _HTML_WRITER = Writer() _HTML_WRITER.translator_class = _NoHeaderHTMLTranslator def rst_to_html(rst: str) -> str: """ Convert reStructuredText document into HTML. Args: rst: A string containing a reStructuredText document Returns: Body content of the produced HTML conversion. """ settings = { "initial_header_level": 2, } pp = publish_parts(rst, writer=_HTML_WRITER, settings_overrides=settings) return f'
{pp["html_body"]}
' def prettify_html(html: str) -> str: """ Prettify an HTML document. Args: html: Input HTML document Returns: The prettified HTML document """ return BeautifulSoup(html, "lxml").prettify() diff --git a/swh/web/templates/api/tokens.html b/swh/web/templates/api/tokens.html deleted file mode 100644 index 19ad64cd..00000000 --- a/swh/web/templates/api/tokens.html +++ /dev/null @@ -1,57 +0,0 @@ -{% extends "layout.html" %} - -{% comment %} -Copyright (C) 2020 The Software Heritage developers -See the AUTHORS file at the top-level directory of this distribution -License: GNU Affero General Public License version 3, or any later version -See top-level LICENSE file for more information -{% endcomment %} - -{% load render_bundle from webpack_loader %} -{% load swh_templatetags %} - -{% block title %} Web API bearer tokens – Software Heritage API {% endblock %} - -{% block header %} -{% render_bundle 'auth' %} -{% endblock %} - -{% block navbar-content %} -

Web API bearer tokens management

-{% endblock %} - -{% block content %} - -

- That interface enables to manage bearer tokens for Web API authentication. - A token has to be sent in HTTP authorization headers to make authenticated API requests. -

-

- For instance when using curl proceed as follows: -

curl -H "Authorization: Bearer ${TOKEN}" {{ request.scheme }}://{{ request.META.HTTP_HOST }}/api/...
-

- -
-
- - -
- - - - - - - -
Creation dateActions
-
- - - -{% endblock content %} \ No newline at end of file diff --git a/swh/web/templates/auth/profile.html b/swh/web/templates/auth/profile.html new file mode 100644 index 00000000..5641e8c8 --- /dev/null +++ b/swh/web/templates/auth/profile.html @@ -0,0 +1,105 @@ +{% extends "layout.html" %} + +{% comment %} +Copyright (C) 2020 The Software Heritage developers +See the AUTHORS file at the top-level directory of this distribution +License: GNU Affero General Public License version 3, or any later version +See top-level LICENSE file for more information +{% endcomment %} + +{% load render_bundle from webpack_loader %} +{% load swh_templatetags %} + +{% block title %} User profile – Software Heritage {% endblock %} + +{% block header %} +{% render_bundle 'auth' %} +{% endblock %} + +{% block navbar-content %} +

User profile

+{% endblock %} + +{% block content %} + + + +
+
+

+ Below are the details of your user account. + You can edit your personal information in the + + Software Heritage Account Management + interface. +

+ + + + + + + + + + + + + + + + + + + + + +
Username{{ user.username }}
First name{{ user.first_name }}
Last name{{ user.last_name }}
Email{{ user.email }}
Permissions: + {% for perm in user.get_all_permissions %} + {{ perm }}
+ {% endfor %} +
+
+ +
+

+ That interface enables to manage bearer tokens for Web API authentication. + A token has to be sent in HTTP authorization headers to make authenticated API requests. +

+

+ For instance when using curl proceed as follows: +

curl -H "Authorization: Bearer ${TOKEN}" {{ request.scheme }}://{{ request.META.HTTP_HOST }}/api/...
+

+
+
+ + +
+ + + + + + + +
Creation dateActions
+
+
+
+ + + +{% endblock content %} diff --git a/swh/web/templates/layout.html b/swh/web/templates/layout.html index bb9119f7..f0a5de26 100644 --- a/swh/web/templates/layout.html +++ b/swh/web/templates/layout.html @@ -1,238 +1,240 @@ {% comment %} -Copyright (C) 2015-2019 The Software Heritage developers +Copyright (C) 2015-2020 The Software Heritage developers See the AUTHORS file at the top-level directory of this distribution License: GNU Affero General Public License version 3, or any later version See top-level LICENSE file for more information {% endcomment %} {% load js_reverse %} {% load static %} {% load render_bundle from webpack_loader %} {% load swh_templatetags %} {% block title %}{% endblock %} {% render_bundle 'vendors' %} {% render_bundle 'webapp' %} {% block header %}{% endblock %}
{% block content %}{% endblock %}
{% include "includes/global-modals.html" %}
back to top
diff --git a/swh/web/tests/auth/test_views.py b/swh/web/tests/auth/test_views.py index f2b4b596..a05c6d07 100644 --- a/swh/web/tests/auth/test_views.py +++ b/swh/web/tests/auth/test_views.py @@ -1,527 +1,566 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import json from urllib.parse import urljoin, urlparse import uuid from keycloak.exceptions import KeycloakError import pytest from django.contrib.auth.models import AnonymousUser, User from django.http import QueryDict from swh.web.auth.models import OIDCUser, OIDCUserOfflineTokens from swh.web.auth.utils import OIDC_SWH_WEB_CLIENT_ID from swh.web.common.utils import reverse +from swh.web.config import get_config from swh.web.tests.django_asserts import assert_contains from swh.web.tests.utils import ( check_html_get_response, check_http_get_response, check_http_post_response, ) from swh.web.urls import _default_view as homepage_view from . import sample_data from .keycloak_mock import mock_keycloak @pytest.mark.django_db def test_oidc_login_views_success(client, mocker): """ Simulate a successful login authentication with OpenID Connect authorization code flow with PKCE. """ # mock Keycloak client kc_oidc_mock = mock_keycloak(mocker) # user initiates login process login_url = reverse("oidc-login") # should redirect to Keycloak authentication page in order # for a user to login with its username / password response = check_html_get_response(client, login_url, status_code=302) request = response.wsgi_request assert isinstance(request.user, AnonymousUser) parsed_url = urlparse(response["location"]) authorization_url = kc_oidc_mock.well_known()["authorization_endpoint"] query_dict = QueryDict(parsed_url.query) # check redirect url is valid assert urljoin(response["location"], parsed_url.path) == authorization_url assert "client_id" in query_dict assert query_dict["client_id"] == OIDC_SWH_WEB_CLIENT_ID assert "response_type" in query_dict assert query_dict["response_type"] == "code" assert "redirect_uri" in query_dict assert query_dict["redirect_uri"] == reverse("oidc-login-complete", request=request) assert "code_challenge_method" in query_dict assert query_dict["code_challenge_method"] == "S256" assert "scope" in query_dict assert query_dict["scope"] == "openid" assert "state" in query_dict assert "code_challenge" in query_dict # check a login_data has been registered in user session assert "login_data" in request.session login_data = request.session["login_data"] assert "code_verifier" in login_data assert "state" in login_data assert "redirect_uri" in login_data assert login_data["redirect_uri"] == query_dict["redirect_uri"] # once a user has identified himself in Keycloak, he is # redirected to the 'oidc-login-complete' view to # login in Django. # generate authorization code / session state in the same # manner as Keycloak code = f"{str(uuid.uuid4())}.{str(uuid.uuid4())}.{str(uuid.uuid4())}" session_state = str(uuid.uuid4()) login_complete_url = reverse( "oidc-login-complete", query_params={ "code": code, "state": login_data["state"], "session_state": session_state, }, ) # login process finalization, should redirect to root url by default response = check_html_get_response(client, login_complete_url, status_code=302) request = response.wsgi_request assert response["location"] == request.build_absolute_uri("/") # user should be authenticated assert isinstance(request.user, OIDCUser) # check remote user has not been saved to Django database with pytest.raises(User.DoesNotExist): User.objects.get(username=request.user.username) @pytest.mark.django_db def test_oidc_logout_view_success(client, mocker): """ Simulate a successful logout operation with OpenID Connect. """ # mock Keycloak client kc_oidc_mock = mock_keycloak(mocker) # login our test user client.login(code="", code_verifier="", redirect_uri="") kc_oidc_mock.authorization_code.assert_called() # user initiates logout oidc_logout_url = reverse("oidc-logout") # should redirect to logout page response = check_html_get_response(client, oidc_logout_url, status_code=302) request = response.wsgi_request logout_url = reverse("logout", query_params={"remote_user": 1}) assert response["location"] == request.build_absolute_uri(logout_url) # should have been logged out in Keycloak kc_oidc_mock.logout.assert_called_with(sample_data.oidc_profile["refresh_token"]) # check effective logout in Django assert isinstance(request.user, AnonymousUser) @pytest.mark.django_db def test_oidc_login_view_failure(client, mocker): """ Simulate a failed authentication with OpenID Connect. """ # mock Keycloak client mock_keycloak(mocker, auth_success=False) # user initiates login process login_url = reverse("oidc-login") # should render an error page response = check_html_get_response( client, login_url, status_code=500, template_used="error.html" ) request = response.wsgi_request # no users should be logged in assert isinstance(request.user, AnonymousUser) # Simulate possible errors with OpenID Connect in the login complete view. def test_oidc_login_complete_view_no_login_data(client, mocker): # user initiates login process login_url = reverse("oidc-login-complete") # should render an error page response = check_html_get_response( client, login_url, status_code=500, template_used="error.html" ) assert_contains( response, "Login process has not been initialized.", status_code=500 ) def test_oidc_login_complete_view_missing_parameters(client, mocker): # simulate login process has been initialized session = client.session session["login_data"] = { "code_verifier": "", "state": str(uuid.uuid4()), "redirect_uri": "", "next_path": "", "prompt": "", } session.save() # user initiates login process login_url = reverse("oidc-login-complete") # should render an error page response = check_html_get_response( client, login_url, status_code=400, template_used="error.html" ) request = response.wsgi_request assert_contains( response, "Missing query parameters for authentication.", status_code=400 ) # no user should be logged in assert isinstance(request.user, AnonymousUser) def test_oidc_login_complete_wrong_csrf_token(client, mocker): # mock Keycloak client mock_keycloak(mocker) # simulate login process has been initialized session = client.session session["login_data"] = { "code_verifier": "", "state": str(uuid.uuid4()), "redirect_uri": "", "next_path": "", "prompt": "", } session.save() # user initiates login process login_url = reverse( "oidc-login-complete", query_params={"code": "some-code", "state": "some-state"} ) # should render an error page response = check_html_get_response( client, login_url, status_code=400, template_used="error.html" ) request = response.wsgi_request assert_contains( response, "Wrong CSRF token, aborting login process.", status_code=400 ) # no user should be logged in assert isinstance(request.user, AnonymousUser) @pytest.mark.django_db def test_oidc_login_complete_wrong_code_verifier(client, mocker): # mock Keycloak client mock_keycloak(mocker, auth_success=False) # simulate login process has been initialized session = client.session session["login_data"] = { "code_verifier": "", "state": str(uuid.uuid4()), "redirect_uri": "", "next_path": "", "prompt": "", } session.save() # check authentication error is reported login_url = reverse( "oidc-login-complete", query_params={"code": "some-code", "state": session["login_data"]["state"]}, ) # should render an error page response = check_html_get_response( client, login_url, status_code=500, template_used="error.html" ) request = response.wsgi_request assert_contains(response, "User authentication failed.", status_code=500) # no user should be logged in assert isinstance(request.user, AnonymousUser) @pytest.mark.django_db def test_oidc_logout_view_failure(client, mocker): """ Simulate a failed logout operation with OpenID Connect. """ # mock Keycloak client kc_oidc_mock = mock_keycloak(mocker) # login our test user client.login(code="", code_verifier="", redirect_uri="") err_msg = "Authentication server error" kc_oidc_mock.logout.side_effect = Exception(err_msg) # user initiates logout process logout_url = reverse("oidc-logout") # should render an error page response = check_html_get_response( client, logout_url, status_code=500, template_used="error.html" ) request = response.wsgi_request assert_contains(response, err_msg, status_code=500) # user should be logged out from Django anyway assert isinstance(request.user, AnonymousUser) @pytest.mark.django_db def test_oidc_silent_refresh_failure(client, mocker): # mock Keycloak client mock_keycloak(mocker) next_path = reverse("swh-web-homepage") # silent session refresh initialization login_url = reverse( "oidc-login", query_params={"next_path": next_path, "prompt": "none"} ) response = check_http_get_response(client, login_url, status_code=302) request = response.wsgi_request login_data = request.session["login_data"] # check prompt value has been registered in user session assert "prompt" in login_data assert login_data["prompt"] == "none" # simulate a failed silent session refresh session_state = str(uuid.uuid4()) login_complete_url = reverse( "oidc-login-complete", query_params={ "error": "login_required", "state": login_data["state"], "session_state": session_state, }, ) # login process finalization, should redirect to logout page response = check_http_get_response(client, login_complete_url, status_code=302) request = response.wsgi_request logout_url = reverse( "logout", query_params={"next_path": next_path, "remote_user": 1} ) assert response["location"] == logout_url def test_view_rendering_when_user_not_set_in_request(request_factory): request = request_factory.get("/") # Django RequestFactory do not set any user by default assert not hasattr(request, "user") response = homepage_view(request) assert response.status_code == 200 def test_oidc_generate_bearer_token_anonymous_user(client): """ Anonymous user should be refused access with forbidden response. """ url = reverse("oidc-generate-bearer-token") check_http_post_response(client, url, data={"password": "secret"}, status_code=403) def _generate_bearer_token(client, password): client.login( code="code", code_verifier="code-verifier", redirect_uri="redirect-uri" ) url = reverse("oidc-generate-bearer-token") return client.post( url, data={"password": password}, content_type="application/json" ) @pytest.mark.django_db def test_oidc_generate_bearer_token_authenticated_user_success(client, mocker): """ User with correct credentials should be allowed to generate a token. """ kc_mock = mock_keycloak(mocker) password = "secret" response = _generate_bearer_token(client, password) user = response.wsgi_request.user assert response.status_code == 200 assert response.content.decode("ascii") == kc_mock.offline_token( username=user.username, password=password ) @pytest.mark.django_db def test_oidc_generate_bearer_token_authenticated_user_failure(client, mocker): """ User with wrong credentials should not be allowed to generate a token. """ response_code = 401 kc_mock = mock_keycloak(mocker) kc_mock.offline_token.side_effect = KeycloakError( error_message="Invalid password", response_code=response_code ) response = _generate_bearer_token(client, password="invalid-password") assert response.status_code == response_code def test_oidc_list_bearer_tokens_anonymous_user(client): """ Anonymous user should be refused access with forbidden response. """ url = reverse( "oidc-list-bearer-tokens", query_params={"draw": 1, "start": 0, "length": 10} ) check_http_get_response(client, url, status_code=403) @pytest.mark.django_db def test_oidc_list_bearer_tokens(client, mocker): """ User with correct credentials should be allowed to list his tokens. """ mock_keycloak(mocker) nb_tokens = 3 password = "secret" for _ in range(nb_tokens): response = _generate_bearer_token(client, password) url = reverse( "oidc-list-bearer-tokens", query_params={"draw": 1, "start": 0, "length": 10} ) response = check_http_get_response(client, url, status_code=200) tokens_data = list(reversed(json.loads(response.content.decode("utf-8"))["data"])) for oidc_token in OIDCUserOfflineTokens.objects.all(): assert ( oidc_token.creation_date.isoformat() == tokens_data[oidc_token.id - 1]["creation_date"] ) def test_oidc_get_bearer_token_anonymous_user(client): """ Anonymous user should be refused access with forbidden response. """ url = reverse("oidc-get-bearer-token") check_http_post_response(client, url, status_code=403) @pytest.mark.django_db def test_oidc_get_bearer_token(client, mocker): """ User with correct credentials should be allowed to display a token. """ mock_keycloak(mocker) nb_tokens = 3 password = "secret" for i in range(nb_tokens): response = _generate_bearer_token(client, password) token = response.content url = reverse("oidc-get-bearer-token") response = check_http_post_response( client, url, status_code=200, data={"password": password, "token_id": i + 1}, content_type="text/plain", ) assert response.content == token @pytest.mark.django_db def test_oidc_get_bearer_token_invalid_password(client, mocker): """ User with wrong credentials should not be allowed to display a token. """ mock_keycloak(mocker) password = "secret" _generate_bearer_token(client, password) url = reverse("oidc-get-bearer-token") check_http_post_response( client, url, status_code=401, data={"password": "invalid-password", "token_id": 1}, ) def test_oidc_revoke_bearer_tokens_anonymous_user(client): """ Anonymous user should be refused access with forbidden response. """ url = reverse("oidc-revoke-bearer-tokens") check_http_post_response(client, url, status_code=403) @pytest.mark.django_db def test_oidc_revoke_bearer_tokens(client, mocker): """ User with correct credentials should be allowed to revoke tokens. """ mock_keycloak(mocker) nb_tokens = 3 password = "secret" for _ in range(nb_tokens): _generate_bearer_token(client, password) url = reverse("oidc-revoke-bearer-tokens") check_http_post_response( client, url, status_code=200, data={"password": password, "token_ids": [1]}, ) assert len(OIDCUserOfflineTokens.objects.all()) == 2 check_http_post_response( client, url, status_code=200, data={"password": password, "token_ids": [2, 3]}, ) assert len(OIDCUserOfflineTokens.objects.all()) == 0 @pytest.mark.django_db def test_oidc_revoke_bearer_token_invalid_password(client, mocker): """ User with wrong credentials should not be allowed to revoke tokens. """ mock_keycloak(mocker) password = "secret" _generate_bearer_token(client, password) url = reverse("oidc-revoke-bearer-tokens") check_http_post_response( client, url, status_code=401, data={"password": "invalid-password", "token_ids": [1]}, ) + + +def test_oidc_profile_view_anonymous_user(client): + """ + Non authenticated users should be redirected to login page when + requesting profile view. + """ + url = reverse("oidc-profile") + login_url = reverse("oidc-login", query_params={"next_path": url}) + resp = check_html_get_response(client, url, status_code=302) + assert resp["location"] == login_url + + +@pytest.mark.django_db +def test_oidc_profile_view(client, mocker): + """ + Authenticated users should be able to request the profile page + and link to Keycloak account UI should be present. + """ + url = reverse("oidc-profile") + kc_config = get_config()["keycloak"] + user_permissions = ["perm1", "perm2"] + mock_keycloak(mocker, user_permissions=user_permissions) + client.login(code="", code_verifier="", redirect_uri="") + resp = check_html_get_response( + client, url, status_code=200, template_used="auth/profile.html" + ) + user = resp.wsgi_request.user + kc_account_url = ( + f"{kc_config['server_url']}realms/{kc_config['realm_name']}/account/" + ) + assert_contains(resp, kc_account_url) + assert_contains(resp, user.username) + assert_contains(resp, user.first_name) + assert_contains(resp, user.last_name) + assert_contains(resp, user.email) + for perm in user_permissions: + assert_contains(resp, perm)